{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Spark Magic\n",
    "\n",
    "BeakerX has a Spark magic that provides deeper integration with Spark.  It provides a GUI dialog for connecting to a cluster, a progress meter that shows how your job is working and links to the regular Spark UI, and it forwards kernel interrupt messages onto the cluster so you can stop a job without leaving the notebook, and it automatically displays Datasets using an interactive widget.  Finally, it automatically closes the Spark session when the notebook is closed.\n",
    "\n",
    "It is compatible with Spark version 2.x."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "b65f7a21-373b-4ac0-94c6-6da3d2a5bb24",
       "version_major": 2,
       "version_minor": 0
      },
      "method": "display_data"
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "%%classpath add mvn\n",
    "org.apache.spark spark-sql_2.12 2.4.4"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The spark cell magic can be run all by itself in a cell.  It produces a GUI dialog you fill out to connect to your cluster. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%spark"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Optionally, the contents of the cell can produce a Spark session to fill out default values for the GUI.  Only one spark magic can be connected at a time."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%spark\n",
    "SparkSession.builder()\n",
    "      .appName(\"BeakerX Demo\")\n",
    "      .master(\"local[4]\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can also provide a `--start` (or `-s`) option to automatically start a session with a cluster (or a local instance)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%spark --start\n",
    "SparkSession.builder().master(\"local[100]\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you have added JARs to the classpath of the Spark driver with the `%classpath` magic, they can be copied to the executors as follows.  We are looking into making this automatic, and also supporting `spark.jars.packages`, see [#7498](https://github.com/twosigma/beakerx/issues/7498)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%spark\n",
    "val jars = ClasspathManager.getJars().toArray.mkString(\",\")\n",
    "SparkSession.builder().config(\"spark.jars\", jars)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "After starting a session with a Spark cluster using one of the above configurations,\n",
    "then code like the following runs in parallel without any additional annotation.\n",
    "A three-way progress widget automatically appears, showing how many tasks are waiting, running, and completed."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "val NUM_SAMPLES = 10000000\n",
    "val count = spark.sparkContext.parallelize(1 to NUM_SAMPLES).map{i =>\n",
    "  val x = math.random\n",
    "  val y = math.random\n",
    "  if (x*x + y*y < 1) 1 else 0\n",
    "}.reduce(_ + _)\n",
    "\n",
    "println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "By default the Dataset preview shows just the columns and their types.  You can click a button to materialize ten rows."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "val tornadoesPath = java.nio.file.Paths.get(\"../resources/data/tornadoes_2014.csv\").toAbsolutePath()\n",
    "\n",
    "val ds = spark.read.format(\"csv\").option(\"header\", \"true\").load(tornadoesPath.toString())\n",
    "ds"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Or you can use the display method to specify any number of rows."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ds.display(1000)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Scala",
   "language": "scala",
   "name": "scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "mimetype": "",
   "name": "Scala",
   "nbconverter_exporter": "",
   "version": "2.12.8"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": false,
   "sideBar": false,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {},
   "toc_section_display": false,
   "toc_window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
